feat: stream detour results per-station via NDJSON#21
Conversation
Replace batch-then-JSON detour API with NDJSON streaming. Each station's detour appears on the map and in the list as soon as its Valhalla calls finish, instead of waiting for the whole batch. - API returns application/x-ndjson, one JSON line per station - Client reads the stream with getReader(), debounce-flushes to React state every 150ms to avoid excessive re-renders - Corridor slider disabled while detours stream to prevent restarts - Bump version to 1.3.0
📝 WalkthroughWalkthroughBumped package version to 1.3.0. The detour API now validates coordinates, increases station limits, and streams per-station NDJSON results; the client consumes and incrementally applies NDJSON with debounced state updates; corridor distance input is disabled and restyled while detours load. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Comment |
- Remove .max(50) from detour schema — streaming handles any count via the concurrency queue, and route-stations can return up to 5000 - Hoist flushTimer so cleanup cancels it; flush() checks aborted before writing state, preventing stale detours leaking into the next route - Late stream errors only mark unseen stations as -1, keeping already-flushed successes instead of wiping the whole map - Sync package-lock.json to 1.3.0
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/app/api/route-detour/route.ts`:
- Around line 60-62: The processStation function returns objects of shape { id:
string; detourMin: number } that are serialized and enqueued without runtime
validation; add a Zod schema (e.g., const stationResponseSchema = z.object({ id:
z.string(), detourMin: z.number() })) and use stationResponseSchema.parse(...)
on the object returned from processStation (or right before the enqueue() call
referenced at enqueue() around line 124) to validate the response payload;
ensure the new schema is imported/defined alongside stationSchema/bodySchema and
throw or handle parse errors so only validated objects are enqueued.
- Around line 85-95: The handler currently ignores request.signal and continues
dequeuing/processing detours; update the detour loop to check
request.signal.aborted before calling dequeue and before each expensive call,
and pass that signal into getRouteDuration (e.g., getRouteDuration(..., {
signal: request.signal })) so in-flight work can be aborted; additionally
validate any streamed response objects with your Zod schema (the same schema
used for API responses) before encoding/queueing them to the stream to ensure
only validated objects are emitted (apply these changes around the detour
processing loop and calls to getRouteDuration and the stream encoder).
In `@src/components/home-client.tsx`:
- Around line 280-350: The scheduled flush timer (flushTimer) isn't cleared on
abort/error paths causing stale pending to be merged later; ensure you
clearTimeout(flushTimer) and set flushTimer = null before any early return or
when handling errors (e.g., inside the AbortError branch and the catch block)
and also clear it in the finally/cleanup path after the reader loop so flush()
cannot run after controller.abort() — update usages in the async IIFE that
reference reader.read(), flush(), scheduleFlush(), flushTimer, controller, and
setDetourMap to always clear the timer (and null it) before returning or
rethrowing.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 2f301c9c-d6ea-41a6-a750-0803d56539d3
📒 Files selected for processing (4)
package.jsonsrc/app/api/route-detour/route.tssrc/components/home-client.tsxsrc/components/search/search-panel.tsx
| const [detourDuration, baselineDuration] = await Promise.all([ | ||
| getRouteDuration([ | ||
| { lat: exitCoord[1], lon: exitCoord[0] }, | ||
| { lat: s.lat, lon: s.lon }, | ||
| { lat: rejoinCoord[1], lon: rejoinCoord[0] }, | ||
| ]), | ||
| getRouteDuration([ | ||
| { lat: exitCoord[1], lon: exitCoord[0] }, | ||
| { lat: rejoinCoord[1], lon: rejoinCoord[0] }, | ||
| ]), | ||
| ]); |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
wc -l src/app/api/route-detour/route.tsRepository: GeiserX/Pumperly
Length of output: 97
🏁 Script executed:
cat -n src/app/api/route-detour/route.ts | head -150Repository: GeiserX/Pumperly
Length of output: 5600
Abort queued detours when the request is canceled.
The handler ignores request.signal, so canceled requests continue draining the queue and triggering upstream Valhalla work. With up to 50 stations and 2 calls per station, a canceled request can spawn ~100 unnecessary calls. Check request.signal.aborted before dequeuing (line 121), and pass the signal to getRouteDuration() if it accepts abort handling.
Additionally, the streamed response objects must be validated with Zod before encoding (line 124), per the API validation requirement.
Also applies to: 115–129
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/app/api/route-detour/route.ts` around lines 85 - 95, The handler
currently ignores request.signal and continues dequeuing/processing detours;
update the detour loop to check request.signal.aborted before calling dequeue
and before each expensive call, and pass that signal into getRouteDuration
(e.g., getRouteDuration(..., { signal: request.signal })) so in-flight work can
be aborted; additionally validate any streamed response objects with your Zod
schema (the same schema used for API responses) before encoding/queueing them to
the stream to ensure only validated objects are emitted (apply these changes
around the detour processing loop and calls to getRouteDuration and the stream
encoder).
- Cap stations array at 500 (route-stations returns max 5000 but real corridor results are far lower) - Validate lon/lat bounds (-180..180, -90..90) and routeFraction (0..1) matching the rest of the routing surface - Observe request.signal: drain the queue on client disconnect so workers stop picking up new stations, and skip enqueue if aborted
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/components/home-client.tsx (1)
276-278:⚠️ Potential issue | 🔴 CriticalAbort handler clears the queue but doesn't cancel in-flight Valhalla calls.
The signal IS checked and the queue IS cleared on abort (lines 122–124, 133), but the check happens after
processStationcompletes. AnygetRouteDuration()call already in flight will finish, even after the route change. With the cap removed, abandoned requests continue wasting upstream calls. Pass the signal togetRouteDurationorprocessStationto allow cancellation of in-flight requests.Also applies to: 288-301, 366-368
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/components/home-client.tsx` around lines 276 - 278, The abort handler creates an AbortController (detourAbortRef) and clears the queue but doesn’t cancel in-flight Valhalla calls because getRouteDuration/processStation are not given the signal; update calls that trigger network work (calls to getRouteDuration and processStation) to accept and propagate an AbortSignal (pass controller.signal from the AbortController created where detourAbortRef.current is set), and ensure getRouteDuration (and any fetch/axios logic it uses) respects the signal and throws/returns early on abort so in-flight requests are cancelled; update all similar call sites (including the blocks around the other noted ranges) to pass the signal through.
🧹 Nitpick comments (1)
src/components/home-client.tsx (1)
14-16: RenameDETOUR_FLUSH_MSto camelCase.
DETOUR_FLUSH_MSbreaks the repo naming rule for TS variables.detourFlushMswould match the project convention.As per coding guidelines, "Use camelCase for variable names in TypeScript/JavaScript files".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/components/home-client.tsx` around lines 14 - 16, The constant DETOUR_FLUSH_MS uses SCREAMING_SNAKE_CASE but must be renamed to camelCase; rename the identifier DETOUR_FLUSH_MS to detourFlushMs in src/components/home-client.tsx and update every usage/reference in the same module (and any tests or other modules that import or reference it) to the new name, preserving the numeric value (150) and keeping the comment intact; ensure TypeScript/ESLint passes after renaming.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/components/home-client.tsx`:
- Around line 339-353: At EOF, the normal completion path currently flushes
pending results but doesn't mark eligible stations that were never seen as
failed; update the EOF handling in the same block that clears flushTimer and
calls flush() to backfill any ids in eligibleIds that are not in seen into
pending with a failure value (e.g., -1) before calling scheduleFlush()/flush(),
so any unseen ids are resolved; locate the variables pending, seen, eligibleIds,
scheduleFlush, flushTimer, controller.signal.aborted, and flush() in the
function and add the backfill step (pending[id] = -1; seen.add(id)) for each id
in eligibleIds minus seen before the final flush.
---
Outside diff comments:
In `@src/components/home-client.tsx`:
- Around line 276-278: The abort handler creates an AbortController
(detourAbortRef) and clears the queue but doesn’t cancel in-flight Valhalla
calls because getRouteDuration/processStation are not given the signal; update
calls that trigger network work (calls to getRouteDuration and processStation)
to accept and propagate an AbortSignal (pass controller.signal from the
AbortController created where detourAbortRef.current is set), and ensure
getRouteDuration (and any fetch/axios logic it uses) respects the signal and
throws/returns early on abort so in-flight requests are cancelled; update all
similar call sites (including the blocks around the other noted ranges) to pass
the signal through.
---
Nitpick comments:
In `@src/components/home-client.tsx`:
- Around line 14-16: The constant DETOUR_FLUSH_MS uses SCREAMING_SNAKE_CASE but
must be renamed to camelCase; rename the identifier DETOUR_FLUSH_MS to
detourFlushMs in src/components/home-client.tsx and update every usage/reference
in the same module (and any tests or other modules that import or reference it)
to the new name, preserving the numeric value (150) and keeping the comment
intact; ensure TypeScript/ESLint passes after renaming.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: c7d58db7-74bf-47ca-9acf-1d68545137ec
⛔ Files ignored due to path filters (1)
package-lock.jsonis excluded by!**/package-lock.json,!**/package-lock.json
📒 Files selected for processing (2)
src/app/api/route-detour/route.tssrc/components/home-client.tsx
🚧 Files skipped from review as they are similar to previous changes (1)
- src/app/api/route-detour/route.ts
src/components/home-client.tsx
Outdated
| for (const line of lines) { | ||
| if (!line) continue; | ||
| try { | ||
| const { id, detourMin } = JSON.parse(line); | ||
| pending[id] = detourMin; | ||
| seen.add(id); | ||
| } catch { /* skip malformed line */ } | ||
| } | ||
|
|
||
| setDetourMap((prev) => { | ||
| const next = { ...prev }; | ||
| for (const d of data.detours) next[d.id] = d.detourMin; | ||
| return next; | ||
| }); | ||
| } catch (err) { | ||
| if (err instanceof DOMException && err.name === "AbortError") return; | ||
| // Network/parse failures: mark batch as failed so stations | ||
| // don't pass maxDetour as "unknown" after loading finishes | ||
| setDetourMap((prev) => { | ||
| const next = { ...prev }; | ||
| for (const f of batch) next[f.properties.id] = -1; | ||
| return next; | ||
| }); | ||
| if (Object.keys(pending).length > 0) scheduleFlush(); | ||
| } | ||
|
|
||
| // Flush remaining results | ||
| if (flushTimer != null) { clearTimeout(flushTimer); flushTimer = null; } | ||
| if (!controller.signal.aborted && Object.keys(pending).length > 0) flush(); |
There was a problem hiding this comment.
Mark unseen stations as failed after a clean EOF.
The catch path already backfills eligibleIds - seen, but the normal completion path does not. If the stream closes early after a server-side error, or one NDJSON line is malformed, those stations stay unresolved forever instead of becoming -1.
Suggested fix
// Flush remaining results
if (flushTimer != null) { clearTimeout(flushTimer); flushTimer = null; }
if (!controller.signal.aborted && Object.keys(pending).length > 0) flush();
+ if (!controller.signal.aborted) {
+ const failed: Record<string, number> = {};
+ for (const id of eligibleIds) {
+ if (!seen.has(id)) failed[id] = -1;
+ }
+ if (Object.keys(failed).length > 0) {
+ setDetourMap((prev) => ({ ...prev, ...failed }));
+ }
+ }
} catch (err) {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/components/home-client.tsx` around lines 339 - 353, At EOF, the normal
completion path currently flushes pending results but doesn't mark eligible
stations that were never seen as failed; update the EOF handling in the same
block that clears flushTimer and calls flush() to backfill any ids in
eligibleIds that are not in seen into pending with a failure value (e.g., -1)
before calling scheduleFlush()/flush(), so any unseen ids are resolved; locate
the variables pending, seen, eligibleIds, scheduleFlush, flushTimer,
controller.signal.aborted, and flush() in the function and add the backfill step
(pending[id] = -1; seen.add(id)) for each id in eligibleIds minus seen before
the final flush.
- Backfill unseen station IDs as -1 on both clean EOF and error, so they can't bypass the detour filter as "unknown" after loading - Client-side chunking in groups of 500 reconciles the cap mismatch between route-stations (5000) and route-detour (500) - getRouteDuration() accepts optional AbortSignal, combined with its 5s timeout via AbortSignal.any(). route-detour passes request.signal so in-flight Valhalla calls abort when the client disconnects - Validate detour result with Zod before enqueue - Rename DETOUR_FLUSH_MS/DETOUR_CHUNK_SIZE to camelCase - Add JSDoc to streaming endpoint
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/lib/valhalla.ts`:
- Around line 139-141: The abort/timeout logic using signal ?
AbortSignal.any([signal, AbortSignal.timeout(5000)]) : AbortSignal.timeout(5000)
can cause callers to receive an AbortError (reject) instead of the function
returning null as declared; update the network/streaming call that uses this
signal to catch aborts and normalize them to null: wrap the await that uses this
signal (the fetch/stream call in this module that references the signal /
AbortSignal.any / AbortSignal.timeout) in a try/catch, detect an abort via
error.name === "AbortError" (or error instanceof DOMException where name ===
"AbortError"), return null for that case, and rethrow other errors so behavior
matches the declared return contract. Ensure the signal construction remains the
same but that AbortErrors are handled locally and converted to null.
- Around line 139-141: The project uses AbortSignal.any (and
AbortSignal.timeout) in src/lib/valhalla.ts which requires Node ≥19; add an
engines.node entry to package.json (e.g., "node": ">=19.0.0" or ">=22.0.0" to
match CI) so local dev reproduces the CI runtime, update package.json
accordingly and communicate this requirement in README or contributor docs;
ensure package manager respects engines (e.g., npm/yarn) if you want
enforcement.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 2298fedb-b6a9-43d8-8080-68ceff89a231
📒 Files selected for processing (3)
src/app/api/route-detour/route.tssrc/components/home-client.tsxsrc/lib/valhalla.ts
🚧 Files skipped from review as they are similar to previous changes (2)
- src/components/home-client.tsx
- src/app/api/route-detour/route.ts
| signal: signal | ||
| ? AbortSignal.any([signal, AbortSignal.timeout(5000)]) | ||
| : AbortSignal.timeout(5000), |
There was a problem hiding this comment.
Abort paths now likely reject the promise instead of returning null.
With caller-provided aborts, this function will reject more often on AbortError; that can terminate the stream path unless every caller catches it. Consider normalizing abort/timeout to null to match the declared return contract.
Proposed fix
export async function getRouteDuration(
locations: { lat: number; lon: number }[],
costing: string = "auto",
signal?: AbortSignal,
): Promise<number | null> {
if (!VALHALLA_URL) return null;
- const res = await fetch(`${VALHALLA_URL}/route`, {
- method: "POST",
- headers: { "Content-Type": "application/json" },
- body: JSON.stringify({
- locations: locations.map((l) => ({ lat: l.lat, lon: l.lon })),
- costing,
- directions_options: { units: "kilometers" },
- directions_type: "none",
- }),
- signal: signal
- ? AbortSignal.any([signal, AbortSignal.timeout(5000)])
- : AbortSignal.timeout(5000),
- });
+ const composedSignal = signal
+ ? AbortSignal.any([signal, AbortSignal.timeout(5000)])
+ : AbortSignal.timeout(5000);
+
+ let res: Response;
+ try {
+ res = await fetch(`${VALHALLA_URL}/route`, {
+ method: "POST",
+ headers: { "Content-Type": "application/json" },
+ body: JSON.stringify({
+ locations: locations.map((l) => ({ lat: l.lat, lon: l.lon })),
+ costing,
+ directions_options: { units: "kilometers" },
+ directions_type: "none",
+ }),
+ signal: composedSignal,
+ });
+ } catch (error) {
+ if (error instanceof DOMException && error.name === "AbortError") return null;
+ throw error;
+ }
if (!res.ok) return null;
const data: { trip: ValhallaTrip } = await res.json();
return data.trip.summary.time;
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/lib/valhalla.ts` around lines 139 - 141, The abort/timeout logic using
signal ? AbortSignal.any([signal, AbortSignal.timeout(5000)]) :
AbortSignal.timeout(5000) can cause callers to receive an AbortError (reject)
instead of the function returning null as declared; update the network/streaming
call that uses this signal to catch aborts and normalize them to null: wrap the
await that uses this signal (the fetch/stream call in this module that
references the signal / AbortSignal.any / AbortSignal.timeout) in a try/catch,
detect an abort via error.name === "AbortError" (or error instanceof
DOMException where name === "AbortError"), return null for that case, and
rethrow other errors so behavior matches the declared return contract. Ensure
the signal construction remains the same but that AbortErrors are handled
locally and converted to null.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "== Node/TypeScript declarations =="
fd -a '^package.json$' --exec sh -c '
echo "--- $1"
jq -r ".engines.node // \"engines.node not set\"" "$1"
jq -r ".devDependencies.typescript // .dependencies.typescript // \"typescript not declared\"" "$1"
' sh {} \;
echo
echo "== tsconfig lib/target settings =="
fd -a -i 'tsconfig*.json' --exec sh -c '
echo "--- $1"
jq -r "{target: .compilerOptions.target, lib: .compilerOptions.lib}" "$1" 2>/dev/null || cat "$1"
' sh {} \;
echo
echo "== AbortSignal.any usage sites =="
rg -n --type=ts --type=tsx -C2 'AbortSignal\.any\('Repository: GeiserX/Pumperly
Length of output: 396
🏁 Script executed:
cat -n src/lib/valhalla.ts | sed -n '135,145p'Repository: GeiserX/Pumperly
Length of output: 448
🏁 Script executed:
# Check for Node version constraints in other files
fd -a -e '.nvmrc' -e '.node-version' -e '.tool-versions' | xargs cat 2>/dev/null || echo "No .nvmrc/.node-version found"Repository: GeiserX/Pumperly
Length of output: 42
🏁 Script executed:
# Check if there's a .github/workflows or similar CI configs that specify Node version
fd -a -i '*.yml' -o -i '*.yaml' | head -20 | xargs grep -l "node.*version" 2>/dev/null | head -5Repository: GeiserX/Pumperly
Length of output: 176
🏁 Script executed:
find .github/workflows -type f \( -name "*.yml" -o -name "*.yaml" \) 2>/dev/null | xargs grep -i "node.*version" 2>/dev/null || echo "No Node version in workflows"Repository: GeiserX/Pumperly
Length of output: 251
🏁 Script executed:
# Check TypeScript version's dom lib capability for AbortSignal.any
# Confirm if esnext or dom.iterable includes AbortSignal.any definitions
node -e "
const tsVersion = require('./package.json').devDependencies.typescript || require('./package.json').dependencies.typescript;
console.log('TypeScript version in package.json:', tsVersion);
"Repository: GeiserX/Pumperly
Length of output: 102
🏁 Script executed:
# Check if AbortSignal.any() is even available - it requires Node 19+
# Let's see if there are any Node 18 or older version constraints elsewhere
rg -i "node.*18|node.*17|node.*16|node.*14" --type=json --type=yaml --type=yml --type=mdRepository: GeiserX/Pumperly
Length of output: 87
Add engines.node to package.json to ensure local development aligns with CI runtime.
Node 22 in CI (.github/workflows/ci.yml) fully supports AbortSignal.any() (available since Node 19.0.0), and TypeScript 6.0.2 has complete type definitions via the dom lib. However, package.json lacks an engines.node declaration, creating a mismatch between CI and local development environments. Developers using older Node versions would encounter runtime errors. Set engines.node to at least "19.0.0" or "22.0.0" to match CI.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/lib/valhalla.ts` around lines 139 - 141, The project uses AbortSignal.any
(and AbortSignal.timeout) in src/lib/valhalla.ts which requires Node ≥19; add an
engines.node entry to package.json (e.g., "node": ">=19.0.0" or ">=22.0.0" to
match CI) so local dev reproduces the CI runtime, update package.json
accordingly and communicate this requirement in README or contributor docs;
ensure package manager respects engines (e.g., npm/yarn) if you want
enforcement.
Summary
ReadableStream.getReader(), debounce-flushing to React state every 150ms to avoid excessive re-rendersChanges
src/app/api/route-detour/route.tsapplication/x-ndjsonstream instead of collected JSONsrc/components/home-client.tsxsrc/components/search/search-panel.tsxpackage.jsonTest plan
Summary by CodeRabbit
Chores
New Features
Bug Fixes